 1.Spark原理之Spark优化中编码的优化
   
   1).RDD复用
   避免创建重复的RDD。在开发过程中要注意：对于同一份数据，只应该创建一个RDD，不要创建多个RDD来代表同
一份数据。
   2).RDD缓存/持久化
   当多次对同一个RDD执行算子操作时，每一次都会对这个RDD以之前的父RDD重新计算一次，这种情况是必须
要避免的，对同一个RDD的重复计算是对资源的极大浪费
   对多次使用的RDD进行持久化，通过持久化将公共RDD的数据缓存到内存/磁盘中，之后对于公共RDD的计算都
会从内存/磁盘中直接获取RDD数据
   RDD的持久化是可以进行序列化的，当内存无法将RDD的数据完整的进行存放的时候，可以考虑使用序列化的
方式减小数据体积，将数据完整存储在内存中
   3).巧用 filter
   尽可能早的执行filter操作，过滤无用数据
   在filter过滤掉较多数据后，使用 coalesce 对数据进行重分区
   4).使用高性能算子
   (1)、避免使用groupByKey，根据场景选择使用高性能的聚合算子 reduceByKey、aggregateByKey
   (2)、coalesce、repartition，选择没有shuffle的操作
   (3)、foreachPartition 优化输出操作
   (4)、map、mapPartitions，选择合理的选择算子
   mapPartitions性能更好，但数据量大时容易导致OOM
   (5)、用 repartitionAndSortWithinPartitions 替代 repartition + sort 操作
   (6)、合理使用 cache、persist、checkpoint，选择合理的数据存储级别
   (7)、filter的使用
   (8)、减少对数据源的扫描(算法复杂了)
   5).设置合理的并行度
   Spark作业中的并行度指各个stage的task的数量
   设置合理的并行度，让并行度与资源相匹配。简单来说就是在资源允许的前提下，并行度要设置的尽可能大，
达到可以充分利用集群资源。合理的设置并行度，可以提升整个Spark作业的性能和运行速度
   6).广播大变量
   默认情况下，task中的算子中如果使用了外部变量，每个task都会获取一份变量的复本，这会造多余的网络传输
和内存消耗
   使用广播变量，只会在每个Executor保存一个副本，Executor的所有task共用此广播变量，这样就节约了网络
及内存资源
   7).Kryo序列化
   默认情况下，Spark使用Java的序列化机制。Java的序列化机制使用方便，不需要额外的配置。但Java序列化机
制效率不高，序列化速度慢而且序列化后的数据占用的空间大
   Kryo序列化机制比Java序列化机制性能提高10倍左右。Spark之所以没有默认使用Kryo作为序列化类库，是它不
支持所有对象的序列化，同时Kryo需要用户在使用前注册需要序列化的类型，不够方便。从Spark 2.0 开始，简单
类型、简单类型数组、字符串类型的 Shuffling RDDs 已经默认使用 Kryo 序列化方式
   8).多使用Spark SQL
   Spark SQL 编码更容易，开发更简单
   Spark的优化器对SQL语句做了大量的优化，一般情况下实现同样的功能，Spark SQL更容易也更高效
   9).优化数据结构
   Spark中有三种类型比较消耗内存：
       对象。每个Java对象都有对象头、引用等额外的信息，占用了额外的内存空间
       字符串。每个字符串内部都有一个字符数组以及长度等额外信息
       集合类型。如HashMap、LinkedList等，集合类型内部通常会使用一些内部类来封装集合元素
   Spark官方建议，在编码实现中，特别是对于算子函数中的代码，尽量不要使用上述三种数据结构。尽量使用字符串
替代对象，使用原始类型（比如Int、Long）替代字符串，使用数组替代集合类型，尽可能地减少内存占用，从而降低
GC频率，提升性能。	   
   10).使用高性能库
   fastutil是扩展了Java标准集合框架 (Map、List、Set；HashMap、ArrayList、HashSet) 的类库，提供了特殊类
型的map、set、list和queue
   fastutil能够提供更小的内存占用、更快的存取速度；可使用fastutil提供的集合类，来替代JDK原生的Map、List
、Set。好处在于使用fastutil集合类，可以减小内存占用，在进行集合操作时，提供更快的存取速度

 2.参数优化
   
   1).Shuffle 调优
   2).内存调优
   3).资源优化
   为 Spark 应用程序分配合理的计算资源。
   Spark的资源参数，基本都可以在 spark-submit 命令中作为参数设置
   
   资源配置不合理，会导致Spark作业的运行效率低下，甚至根本无法运行。因此必须对Spark作业的资源使用原理有
一个清晰的认识，并知道在Spark作业运行过程中，有哪些资源参数是可以设置的，以及如何设置合适的参数值。
   
   哪些是资源，给哪些组件分配资源：
       CPU、Memory
       Driver。需要占用一定的资源，缺省情况下系统给Driver分配1个Core，1G内存；内存可以适当调整
	   Executor。每个Executor进程都占有一定数量的内存和core。分配多少个core，意味着允许有多少个 Task 可以
并行执，这些Task共享内存资源
   
   如何估算分配多少资源
       In general, we recommend 2-3 tasks per CPU core in your cluster (Tuning)
       数据量
       计算的类型，迭代计算 OR 统计
       文件格式、处理的字段
./bin/spark-submit \ 
  --master yarn-cluster \ 
  --num-executors 100 \ 
  --executor-memory 10G \ 
  --executor-cores 4 \ 
  --driver-memory 2G \ 
  --conf spark.memory.fraction=0.7   	   
   4).动态资源分配
   动态资源分配(DRA，dynamic resource allocation)
       默认情况下，Spark采用资源预分配的方式。即为每个Spark应用设定一个最大可用资源总量，该应用在整个生
命周期内都会持有这些资源
       Spark提供了一种机制，使它可以根据工作负载动态调整应用程序占用的资源。这意味着，不使用的资源，应用
程序会将资源返回给集群,并在稍后需要时再次请求资源。如果多个应用程序共享Spark集群中的资源,该特性尤为有用
       动态的资源分配是 executor 级
       默认情况下禁用此功能，并在所有粗粒度集群管理器上可用（CDH发行版中默认为true）
       在Spark On Yarn模式下使用：
	       num-executors指定app使用executors数量
           executor-memory、executor-cores指定每个executor所使用的内存、cores
		   
   动态申请executor：
       如果有新任务处于等待状态，并且等待时间超过Spark.dynamicAllocation.schedulerBacklogTimeout(默认
1s)，则会依次启动executor，每次启动1、2、4、8…个executor（如果有的话）。启动的间隔由
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 控制 (默认与schedulerBacklogTimeout相同)
   
   动态移除executor：
       executor空闲时间超过 spark.dynamicAllocation.executorIdleTimeout 设置的值(默认60s)，该executor会被
移除，除非有缓存数据
   
   相关参数：
       spark.dynamicAllocation.enabled = true
       Standalone模式：spark.shuffle.service.enabled = true
       Yarn模式：《Running Spark on YARN》-- Configuring the External Shuffle Service  
   备注：两个参数默认都是false。external shuffle service 的目的是在移除Executor的时候,能够保留Executor输
出的 shuffle 文件。
       spark.dynamicAllocation.executorIdleTimeout（默认60s）。Executor闲置了超过此持续时间，将被删除
       spark.dynamicAllocation.cachedExecutorIdleTimeout（默认infinity）。已缓存数据块的Executor闲置了超
过此持续时间，则该执行器将被删除
       spark.dynamicAllocation.initialExecutors（默认spark.dynamicAllocation.minExecutors）。初始分配
Executor 的个数。如果设置了--num-executors(或spark.executor.instances)并且大于此值,该参数将作为Executor 
初始的个数	
       spark.dynamicAllocation.maxExecutors（默认infinity）。Executor 数量的上限
       spark.dynamicAllocation.minExecutors（默认0）。Executor 数量的下限 
       spark.dynamicAllocation.schedulerBacklogTimeout(默认1s)。任务等待时间超过了此期限，则将请求新的
Executor	   
   5).调节本地等待时长
      Spark总是倾向于让所有任务都具有最佳的数据本地性。遵循移动计算不移动数据的思想，Spark希望task能够
运行在它要计算的数据所在的节点上，这样可以避免数据的网络传输
        PROCESS_LOCAL > NODE_LOCAL > NO_PREF > RACK_LOCAL > ANY
	  在某些情况下，可能会出现一些空闲的executor没有待处理的数据，那么Spark可能就会牺牲一些数据本地
	  如果对应节点资源用尽，Spark会等待一段时间(默认3s)。如果等待指定时间后仍无法在该节点运行，那么自动
降级，尝试将task分配到比较差的本地化级别所对应的节点上；如果当前级别仍然不行，那么继续降级
      调节本地等待时长。如果在等待时间内，目标节点处理完成了一部分 Task，那么等待运行的 Task 将有机会得到
执行，获得较好的数据本地性，提高 Spark 作业整体性能
      根据数据本地性不同，等待的时间间隔也不一致，不同数据本地性的等待时间设置参数
	      spark.locality.wait：设置所有级别的数据本地性，默认是3000毫秒
          spark.locality.wait.process：多长时间等不到PROCESS_LOCAL就降级，默认为${spark.locality.wait}
          spark.locality.wait.node：多长时间等不到NODE_LOCAL就降级，默认为${spark.locality.wait}
          spark.locality.wait.rack：多长时间等不到RACK_LOCAL就降级，默认为${spark.locality.wait}
   6).调节连接等待时长
   在Spark作业运行过程中，Executor优先从自己本地关联的BlockManager中获取某份数据，如果本地BlockManager
没有的话，会通过 TransferService 远程连接其他节点上Executor的BlockManager来获取数据；
   
   在生产环境下，有时会遇到file not found、file lost这类错误。这些错误很有可能是 Executor 的 BlockManager 
在拉取数据的时候，无法建立连接，然后超过默认的连接等待时长后，宣告数据拉取失败。如果反复尝试都拉取不到数
据，可能会导致Spark作业的崩溃。这种情况也可能会导致 DAGScheduler 反复提交几次stage，TaskScheduler返回提
交几次task，延长了Spark作业的运行时间；
   
   此时，可以考虑调节连接的超时时长，设置：spark.core.connection.ack.wait.timeout = 300s (缺省值120s)
   调节连接等待时长后，通常可以避免部分的文件拉取失败、文件丢失等报错。
   
假设：
处理500G的数据文件(未压缩)
100 节点(16 Core, 48 G)
普通的统计运算(不是迭代计算/机器学习)
不能最简单的wordcount

分配多少个资源，如何分配
Driver(1 个)缺省：1Core / 4 G
Executor(多个)

根据经验(xxx)
解答(是起点不是终点)
In general, we recommend 2-3 tasks per CPU core in your cluster (Tuning)
20-30 Task / core   

1).需要多少给个core
500 G / 128 M = 500 * 8 = 4000 个
4000 / 4 = 1000 core
2).在每个节点如何分配Executor
内存是否足够，是否需要分配堆外内存上，堆外内存分配在worker上，executor共享
每个Executor的Core在3-5之间
Executor(5 Core, 15 G)
3). 1000 core/5 = 200 executor
4).如何申请
./bin/spark-submit \ 
  --master yarn-cluster \ 
  --num-executors 100 \ 
  --executor-memory 10G \ 
  --executor-cores 4 \ 
  --driver-memory 2G \ 
  --conf spark.memory.fraction=0.7   

16 Core, 48 G =>
Executor(16 Core, 48 G)XXX
Executor(8 Core, 24 G)
Executor(4 Core, 12 G)
Executor(2 Core, 6 G)XXX

Executor(16 Core, 32 G) + 16 G 堆外内存
Executor(8 Core, 16 G)
Executor(4 Core, 8 G)
Executor(2 Core, 4 G)